Agenda

Announcements

  • Ch 17 programming notebook not assigned, but highly recommended
  • several steps take some non-trivial configuration that I don’t want to require per se

MDSR Ch 17: Towards big data

Biggest of the big… (for perspective)

What happens when data start getting big (in R)

Memory limits in R

Strategies when data get big

If things are just slow…

Recall the teller simulation:


any_active <- function(df) {
  # return TRUE if someone has not finished
  return(max(df$endtime) == Inf)
}

next_customer <- function(df) {
  # returns the next customer in line
  res <- filter(df, endtime == Inf) %>%
    arrange(arrival)
  return(head(res, 1))
}

update_customer <- function(df, cust_num, end_time) {
  # sets the end time of a specific customer
  return(mutate(df, endtime = ifelse(custnum == cust_num, end_time, endtime)))
}


teller_sim <- function(n = 1/2, m = 3/2, hours = 6) {
  # simulation of bank where there is just one teller
  # n: expected number of customers per minute
  # m: expected length of transaction is m minutes
  # hours: bank open for this many hours
  
  customers <- rpois(hours * 60, lambda = n)
  arrival <- numeric(sum(customers))
  position <- 1
  for (i in 1:length(customers)) {
    numcust <- customers[i]
    if (numcust != 0) {
      arrival[position:(position + numcust - 1)] <- rep(i, numcust)
      position <- position + numcust
    }
  }
  duration <- rexp(length(arrival), rate = 1/m)  # E[X]=m
  df <- data.frame(arrival, duration, custnum = 1:length(duration), 
                   endtime = Inf, stringsAsFactors = FALSE)
  
  endtime <- 0 # set up beginning of simulation
  while (any_active(df)) { # anyone left to serve
    next_one <- next_customer(df)
    now <- ifelse(next_one$arrival >= endtime, next_one$arrival, endtime)
    endtime <- now + next_one$duration
    df <- update_customer(df, next_one$custnum, endtime)
  }
  df <- mutate(df, totaltime = endtime - arrival)
  return(favstats(~ totaltime, data = df))
}

Profiling the teller simulation

Rprof("TellerSimProfile")
teller_sim()
Rprof(NULL)
head(summaryRprof("TellerSimProfile")$by.self, 20)

STILL slow? Try biglm

require(biglm)

n <- 20000
p <- 500
d <- as.data.frame(matrix(rnorm(n * (p + 1)), ncol = (p + 1)))
expl_vars <- paste(paste0("V", 2:(p+1)), collapse = " + ")
my_formula <- as.formula(paste("V1 ~ ", expl_vars))

# profile `lm` vs `biglm`
system.time(lm(my_formula, data = d))
   user  system elapsed 
  5.779   0.251   6.131 
system.time(biglm(my_formula, data = d))
   user  system elapsed 
  3.777   0.182   4.002 

Next step: parallel processing

my_cores <- detectCores()
my_cores
[1] 4
k <- 5

# without parallel processing
system.time(lapply(1:k, teller_sim))
   user  system elapsed 
 12.102   0.258  12.565 
# parallelize with 3 cores
system.time(mclapply(1:k, teller_sim, mc.cores = my_cores - 1))
   user  system elapsed 
 11.942   0.933  12.079 

MapReduce (parallelization that’s not embarassing?)

Hadoop & Spark

Interface with Spark

require(sparklyr)
# spark_install()   # only once per machine

Interface with Spark

# modify master to connect to a remote Spark cluster
sc <- spark_connect(master = "local")
* Using Spark: 2.4.0
class(sc)
[1] "spark_connection"       "spark_shell_connection" "DBIConnection"         
babynames_tbl <- 
  sc %>%
  copy_to(babynames::babynames, "babynames", overwrite = TRUE)

class(babynames_tbl)
[1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"      

Counting Matthews

babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()
Missing values are always removed in SQL.
Use `SUM(x, na.rm = TRUE)` to silence this warning
This warning is displayed only once per session.

From dplyr to SQL

q <- 
  babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()

q
show_query(q)
<SQL>
SELECT `year`, count(*) AS `N`, SUM(`n`) AS `total_births`
FROM `babynames`
WHERE (`name` = "Matthew")
GROUP BY `year`
ORDER BY `total_births` DESC
LIMIT 6

Querying the Spark cluster

require(DBI)

dbGetQuery(conn = sc, statement = "
                         SELECT year, sum(1) as N, sum(n) as total_births
                         FROM babynames
                         WHERE name == 'Matthew'
                         GROUP BY year
                         ORDER BY total_births desc
                         LIMIT 6
                         ")

Modeling with Spark

require(macleish)

weather_tbl <- copy_to(sc, whately_2015, overwrite = TRUE)

weather_tbl %>%
  sparklyr::ml_linear_regression(rainfall ~ temperature + pressure + rel_humidity) %>%
  summary()
Deviance Residuals:
      Min        1Q    Median        3Q       Max 
-0.041290 -0.021761 -0.011632 -0.000576 15.968356 

Coefficients:
 (Intercept)  temperature     pressure rel_humidity 
   0.7177542    0.0004089   -0.0007545    0.0004377 

R-Squared: 0.004824
Root Mean Squared Error: 0.1982

Alternatives to SQL (Google BigQuery)

require(bigrquery)

project_id <- "stat-380-class-demo"   # Beckman's google cloud project ID

sql <- "SELECT word, count(distinct corpus) as numPlays, sum(word_count) as N
        FROM [publicdata:samples.shakespeare]
        GROUP BY word
        ORDER BY N desc
        LIMIT 10
        "
query_exec(query = sql, project = project_id)
Waiting for authentication in browser...
Press Esc/Ctrl + C to abort
Authentication complete.

Running job -:  1s:
Running job \:  2s:
Running job |:  2s:
Running job /:  3s:
Running job -:  3s:
Running job \:  3s:
Running job |:  4s:
Running job /:  4s:
Running job -:  5s:
Running job \:  5s:
Running job |:  5s:
Running job /:  5s:
                                                                               
10.0 megabytes processed

Rcpp

require(Rcpp)

# write a simple function in C++

cppFunction('int addemup(int x, int y, int z) {
  int sum = x + y + z;
  return sum;
}')


# R recognizes `addemup` like any other function
addemup
function (x, y, z) 
.Call(<pointer: 0x1176de8f0>, x, y, z)
addemup(2, 4, 6)
[1] 12

Stan

parameters {
  real y[2];
}
model {
  y[1] ~ normal(0, 1);
  y[2] ~ double_exponential(0, 2);
}
Loading required package: sp
library(rstan)
Loading required package: StanHeaders
rstan (Version 2.19.2, GitRev: 2e1f913d3ca3)
For execution on a local, multicore CPU with excess RAM we recommend calling
options(mc.cores = parallel::detectCores()).
To avoid recompilation of unchanged Stan programs, we recommend calling
rstan_options(auto_write = TRUE)

Attaching package: ‘rstan’

The following object is masked from ‘package:tidyr’:

    extract
fit <- sampling(ex1, cores = 3)
starting worker pid=25541 on localhost:11849 at 12:27:28.837
starting worker pid=25555 on localhost:11849 at 12:27:29.119
starting worker pid=25569 on localhost:11849 at 12:27:29.379

SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 1).
Chain 1: 
Chain 1: Gradient evaluation took 1.1e-05 seconds
Chain 1: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 1: Adjust your expectations accordingly!
Chain 1: 
Chain 1: 
Chain 1: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 1: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 1: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 1: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 1: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 1: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 1: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 1: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 1: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 1: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 1: Iteration: 1800 / 2000 [ 90%]  (Sampling)

SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 2).
Chain 1: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 1: 
Chain 1:  Elapsed Time: 0.032539 seconds (Warm-up)
Chain 1:                0.031726 seconds (Sampling)
Chain 1:                0.064265 seconds (Total)
Chain 1: 
Chain 2: 
Chain 2: Gradient evaluation took 1.1e-05 seconds
Chain 2: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 2: Adjust your expectations accordingly!
Chain 2: 
Chain 2: 
Chain 2: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 2: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 2: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 2: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 2: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 2: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 2: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 2: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 3).
2: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 3: 
Chain 3: Gradient evaluation took 9e-06 seconds
Chain 3: 1000 transitions using 10 leapfrog steps per transition would take 0.09 seconds.
Chain 3: Adjust your expectations accordingly!
Chain 3: 
Chain 3: 
Chain 3: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 2: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 2: Iteration: 1800 / 2000 [ 90%]  (Sampling)
Chain 2: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 2: 
Chain 2:  Elapsed Time: 0.033218 seconds (Warm-up)
Chain 2:                0.029568 seconds (Sampling)
Chain 2:                0.062786 seconds (Total)
Chain 2: 
Chain 3: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 3: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 3: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 3: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 3: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 3: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 3: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 3: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 3: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 3: Iteration: 1800 / 2000 [ 90%]  (Sampling)
Chain 3: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 3: 
Chain 3:  Elapsed Time: 0.033801 seconds (Warm-up)
Chain 3:                0.032489 seconds (Sampling)
Chain 3:                0.06629 seconds (Total)
Chain 3: 

SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 4).
Chain 4: 
Chain 4: Gradient evaluation took 6e-06 seconds
Chain 4: 1000 transitions using 10 leapfrog steps per transition would take 0.06 seconds.
Chain 4: Adjust your expectations accordingly!
Chain 4: 
Chain 4: 
Chain 4: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 4: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 4: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 4: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 4: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 4: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 4: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 4: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 4: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 4: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 4: Iteration: 1800 / 2000 [ 90%]  (Sampling)
Chain 4: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 4: 
Chain 4:  Elapsed Time: 0.019408 seconds (Warm-up)
Chain 4:                0.016486 seconds (Sampling)
Chain 4:                0.035894 seconds (Total)
Chain 4: 
print(fit)
Inference for Stan model: stan-5a7c20ea0b45.
4 chains, each with iter=2000; warmup=1000; thin=1; 
post-warmup draws per chain=1000, total post-warmup draws=4000.

      mean se_mean   sd  2.5%   25%   50%   75% 97.5% n_eff Rhat
y[1] -0.02    0.02 1.02 -2.01 -0.72 -0.01  0.67  2.01  1897    1
y[2]  0.10    0.06 2.79 -5.72 -1.28  0.06  1.47  6.03  2006    1
lp__ -1.50    0.03 1.22 -4.61 -2.06 -1.22 -0.62 -0.10  1329    1

Samples were drawn using NUTS(diag_e) at Wed Oct  2 12:27:32 2019.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at 
convergence, Rhat=1).
# posterior
stan_plot(fit, point_est = "mean", show_density = TRUE, fill_color = "dodgerblue")
ci_level: 0.8 (80% intervals)
outer_level: 0.95 (95% intervals)

# trace
stan_trace(fit) +
  scale_color_manual(values = c("red", "blue", "green", "black"))
Scale for 'colour' is already present. Adding another scale for 'colour',
which will replace the existing scale.

Python in R Markdown

https://rstudio.github.io/reticulate/#python-in-r-markdown

reticulate & R Notebooks

Create object in R code chunk

require(reticulate)
dat <- c(180, 215, 210, 210, 188, 176, 209, 200)

Manipulate r.dat in Python code chunk

198.5

Access py$avg object in R code chunk

py$avg
[1] 198.5
---
title: "Extending R to accommodate 'big data' and interface with other software technologies"
subtitle: "MDSR Ch 17--Towards Big Data"
output: 
  slidy_presentation: default
  html_notebook: default  
---


```{r Front Matter, echo=TRUE, message=FALSE, warning=FALSE, include=FALSE}
# clean up R environment
rm(list = ls())

# global options
knitr::opts_chunk$set(eval=TRUE, include=TRUE)
options(digits=4)

# load all packages here
library(mdsr)
library(tidyverse)

library(biglm)
library(bigrquery)
library(DBI)
library(macleish)
library(parallel)
library(Rcpp)
library(reticulate)
library(rstan)
library(sparklyr)


# inputs summary

```


# Agenda


#### Announcements

- Ch 17 programming notebook not assigned, but *highly* recommended
- several steps take some non-trivial configuration that I don't want to require per se


# MDSR Ch 17: Towards big data

- 3 V's of big data
    - Volume
    - Variety
    - Velocity
- "big data is when your workflow breaks" --Randy Pruim
- how big is "big" is relative
    - pencil/paper workflow: 30 rows & 3 columns is "big"
    - TI 83 workflow: larger than 99 rows (or columns) is "big"
    - MS Excel workflow: larger than any of the following constraints is "big"
        - 1,048,576 rows ($2^20$)
        - 16,384 columns ($2^14$)
        - 255 characters in a column ($2^8 - 1$)
    - Laptop running R: large with respect to available memory


# Biggest of the big... (for perspective)

- the Large Hadron Collider in Geneva generates 25 petabytes of data per year
    - one petabyte is a million gigabytes
    - all the workflows are broken
    - they actually only save 0.001% of the data generated
- Others just for fun ([based on 2018 Forbes article](https://www.forbes.com/sites/bernardmarr/2018/05/21/how-much-data-do-we-create-every-day-the-mind-blowing-stats-everyone-should-read/#7ef508c760ba))
    - Google processes 40,000 searches *per second*
    - 300 million photos uploaded to Facebook per day
    - The Weather Channel receives 18,055,556 forecast requests *per minute*

![https://www.iflscience.com/technology/how-much-data-does-the-world-generate-every-minute/](2018bigData.png){width=95%}



# What happens when data start getting big (in R)

- The data may not load into memory
- Analyzing the data may take a (really) long time
- Visualizations get messy

# Sidebar about R vs SAS for large data sets

- SAS allocates memory dynamically to keep data on disk (by default)
- R loads all data into memory (by default)
- bottom line: 
    - by default, SAS handles very large datasets better... it "just works"
    - but don't use the R default!

# Memory limits in R

- If you're running 32-bit R on any OS, it'll be 2 or 3 GB
    - note: 2GB of memory used by R is not the same as 2GB on disk
    - Overhead for R to keep track of your data
    - Memory used for analysis, etc.
    - Probably not more than about 500MB on disk?
- If you're running 64-bit R on a 64-bit OS, the upper limit should be infinite (but it's not)
    - Package `bigmemory`
        - "Manage massive matrices with shared memory and memory-mapped files"
        - use in parallel environments can provide substantial speed and memory efficiencies
        - uses a pointer to a C++ data structure
        - (con) matrix data structure requires homogeneity 
    - Package `ff`
        - "Memory-Efficient Storage of Large Data on Disk and Fast Access Functions"
        - provides data structures that are stored on disk but behave (almost) as if they were in RAM
        - uses a pointer to a flat binary file stored on disk, and it can be shared across different sessions
        - permits heterogeneous data structures 


#  Strategies when data get big

- Make the data smaller 
- Get a bigger computer 
- Access the data differently
- Split up the dataset for analysis


# If things are just slow...

- time your code
    - one-liner: 
    ```
        system.time(<call>)
    ```
    - alternative: 
    ```
        start_time <- proc.time()
        <call>
        proc.time() – start_time
    ```
- profile your code to find out which specific operations are slowing you down
- small changes (e.g., vectorize to avoid a loop) can have huge benefit


#### Recall the teller simulation: 

```{r}

any_active <- function(df) {
  # return TRUE if someone has not finished
  return(max(df$endtime) == Inf)
}

next_customer <- function(df) {
  # returns the next customer in line
  res <- filter(df, endtime == Inf) %>%
    arrange(arrival)
  return(head(res, 1))
}

update_customer <- function(df, cust_num, end_time) {
  # sets the end time of a specific customer
  return(mutate(df, endtime = ifelse(custnum == cust_num, end_time, endtime)))
}


teller_sim <- function(n = 1/2, m = 3/2, hours = 6) {
  # simulation of bank where there is just one teller
  # n: expected number of customers per minute
  # m: expected length of transaction is m minutes
  # hours: bank open for this many hours
  
  customers <- rpois(hours * 60, lambda = n)
  arrival <- numeric(sum(customers))
  position <- 1
  for (i in 1:length(customers)) {
    numcust <- customers[i]
    if (numcust != 0) {
      arrival[position:(position + numcust - 1)] <- rep(i, numcust)
      position <- position + numcust
    }
  }
  duration <- rexp(length(arrival), rate = 1/m)  # E[X]=m
  df <- data.frame(arrival, duration, custnum = 1:length(duration), 
                   endtime = Inf, stringsAsFactors = FALSE)
  
  endtime <- 0 # set up beginning of simulation
  while (any_active(df)) { # anyone left to serve
    next_one <- next_customer(df)
    now <- ifelse(next_one$arrival >= endtime, next_one$arrival, endtime)
    endtime <- now + next_one$duration
    df <- update_customer(df, next_one$custnum, endtime)
  }
  df <- mutate(df, totaltime = endtime - arrival)
  return(favstats(~ totaltime, data = df))
}

```


# Profiling the teller simulation 

- let's see where the time goes...
- not bad in this case, but you could discover interesting things... like
    - maybe some modeling function defaults to bootstrap confidence intervals that you don’t care about with 1000 iterations per model
    - you did something for every line of your huge data frame and then combine results using `c()` or `rbind()` rather than assigning to a preallocated vector or matrix


```{r}
Rprof("TellerSimProfile")
teller_sim()
Rprof(NULL)
head(summaryRprof("TellerSimProfile")$by.self, 20)
```


# STILL slow? Try `biglm`

- `biglm` package has an efficient alternative to the `lm` function
- it can even fit generalized linear models (regression & logistic regression) with data frames that are larger than memory

```{r}
require(biglm)

n <- 20000
p <- 500
d <- as.data.frame(matrix(rnorm(n * (p + 1)), ncol = (p + 1)))
expl_vars <- paste(paste0("V", 2:(p+1)), collapse = " + ")
my_formula <- as.formula(paste("V1 ~ ", expl_vars))

# profile `lm` vs `biglm`
system.time(lm(my_formula, data = d))

system.time(biglm(my_formula, data = d))
```


# Next step: parallel processing

- **Parallel processing** is basically farming out subtasks to independent processors, then merging results
- effectively just allocates more RAM for the problem

```{r}
my_cores <- detectCores()
my_cores
```


- **embarassingly parallel computing**
    - I need to repeat the same task many times
    - order of implementation doesn't matter
- I have 4 total cores, but you should always save one for your operating system
- e.g., comparison for several iterations of teller simulation

```{r}
k <- 5

# without parallel processing
system.time(lapply(1:k, teller_sim))

# parallelize with 3 cores
system.time(mclapply(1:k, teller_sim, mc.cores = my_cores - 1))
```


# MapReduce (parallelization that's not embarassing?)

- programming paradigm for parallel computing
    - two phase algorithm
    - **map**--farm out parallizeable task to many machines
    - **reduce**--combine results
- tricky part: **you** have to define the `map` function and the `reduce` function
- needs software implementation
    - Hadoop
    - Spark

# Hadoop & Spark

- Hadoop was first to really tackle MapReduce 
- Hadoop MapReduce has been superseded by Spark, 
    - tools that emerged as the "ecosystem" around it are still popular (HDFS)
    - "legacy" projects might still use Hadoop MapReduce
- Apache Spark is considered superior for a few reasons
    - had the benefit of implementing lessons learned from Hadoop
    - keep the good--HDFS (Hadoop Distributed File System) for disk storage
    - improve the weaknesses--prioritize RAM rather than disk storage whenever possible


# Interface with Spark

- Spark provides provides an interface for programming entire clusters
- a **computer cluster** is a set of connected computers that work together as a single system
- the `sparklyr` package in R makes it easy to 
    - install a local Spark cluster (from within R)
    - connect to a local or remote cluster

```{r eval=FALSE}
require(sparklyr)
# spark_install()   # only once per machine
```


# Interface with Spark


```{r}
# modify master to connect to a remote Spark cluster
sc <- spark_connect(master = "local")
class(sc)
```


```{r}
babynames_tbl <- 
  sc %>%
  copy_to(babynames::babynames, "babynames", overwrite = TRUE)

class(babynames_tbl)
```


# Counting Matthews

```{r}
babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()
```


# From `dplyr` to SQL

- whenever `dplyr` meets an object with class `tbl_sql` (like `babynames_tbl`), `dplyr` **automatically** translates the R pipeline into **SQL**
- **SQL** (structured query language) is a widely used language for querying relational databases, among other purposes
- Queries in SQL start with the `SELECT` keyword and consist several clauses which **must to be written in order**; basically
    - `SELECT` (**required**)--like `select()` in `dplyr` (and possibly combined with `mutate()`)
    - `FROM` (**required**)--like table before the first `%>%` in `dplyr`
    - `JOIN`--like `join()` verbs in `dplyr`
    - `WHERE`--like `filter()` verb in `dplyr`
    - `GROUP BY`--like `group_by()` verb in `dplyr`
    - `HAVING`--like using a second `filter()` in `dplyr` after the rows have already been 
    - `ORDER BY`--like `arrange()` verb in `dplyr` 
    - `LIMIT`--sort of like `head()` but more versatile
- For example, let's revisit our previous `dplyr` query

```{r}
q <- 
  babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()

q
```

```{r}
show_query(q)
```

# Querying the Spark cluster

- Spark is a parallelized technology designed to supersede SQL, but it's still useful to know SQL in order to use Spark
- here, we'll query the Spark cluster using the connection we've defined `sc` with the SQL statement equivalent to our `dplyr` wrangling

```{r}
require(DBI)

dbGetQuery(conn = sc, statement = "
                         SELECT year, sum(1) as N, sum(n) as total_births
                         FROM babynames
                         WHERE name == 'Matthew'
                         GROUP BY year
                         ORDER BY total_births desc
                         LIMIT 6
                         ")
```


# Modeling with Spark

- `whately_2015` has some weather data from Massachusetts (in the `macleish` package)
- Spark has a machine learning library which includes many of the supervised/unsupervised learning tools we've discussed this semester
- let's use Spark to fit a multiple regression model as an example

```{r}
require(macleish)

weather_tbl <- copy_to(sc, whately_2015, overwrite = TRUE)

weather_tbl %>%
  sparklyr::ml_linear_regression(rainfall ~ temperature + pressure + rel_humidity) %>%
  summary()

```




# Alternatives to SQL (Google BigQuery)


```{r}
require(bigrquery)

project_id <- "stat-380-class-demo"   # Beckman's google cloud project ID

sql <- "SELECT word, count(distinct corpus) as numPlays, sum(word_count) as N
        FROM [publicdata:samples.shakespeare]
        GROUP BY word
        ORDER BY N desc
        LIMIT 10
        "
query_exec(query = sql, project = project_id)

```


# `Rcpp`

- `RCPP::cppFunction()` allows you to write C++ functions in R
- `Rcpp::sourceCpp()` loads a C++ file from disk in the same way you use source() to load a file of R code. 
- Rcpp will compile the C++ code and construct an R function that connects to the compiled C++ function
- more here: <http://adv-r.had.co.nz/Rcpp.html>


```{r}
require(Rcpp)

# write a simple function in C++

cppFunction('int addemup(int x, int y, int z) {
  int sum = x + y + z;
  return sum;
}')


# R recognizes `addemup` like any other function
addemup
```

```{r}
addemup(2, 4, 6)
```



# Stan

- Bayesian statistical inference with MCMC sampling
- Stan model within the code chunk is compiled into a "stanmodel" object 
- result assigned to a variable with the name given by the `output.var` option


```{stan, output.var="ex1"}
parameters {
  real y[2];
}
model {
  y[1] ~ normal(0, 1);
  y[2] ~ double_exponential(0, 2);
}
```


```{r}
library(rstan)
fit <- sampling(ex1, cores = 3)
print(fit)
```

```{r}
# posterior
stan_plot(fit, point_est = "mean", show_density = TRUE, fill_color = "dodgerblue")

# trace
stan_trace(fit) +
  scale_color_manual(values = c("red", "blue", "green", "black"))

```


# Python in R Markdown

<https://rstudio.github.io/reticulate/#python-in-r-markdown>

- `reticulate` package includes a Python engine for R Markdown:
    - Run Python chunks in a single Python session embedded within your R session (shared variables/state between Python chunks)
    - Printing of Python output, including graphical output from `matplotlib`.
    - Access to objects created within Python chunks from R using the `py` object (e.g. `py$x` would access an `x` variable created within Python from R).
    - Access to objects created within R chunks from Python using the `r` object (e.g. `r.x` would access to `x` variable created within R from Python)
- Built in conversion for many Python object types, including `NumPy` arrays and `Pandas` data frames.


# `reticulate` & R Notebooks

- `reticulate` package does make it pretty easy to go back and forth between R & Python objects
- it "just works" when you compile RMarkdown 
- the R Notebook isn't yet ironed out in the current production release (v1.1)
    - preview will throw errors, even when the code is correct
    - R Notebook is no better than preview
    - This will be fixed in RStudio v1.2 [(install preview here...)](https://www.rstudio.com/products/rstudio/download/preview/)  
        - you should install it (after the semester)
        - I generally don't recommend major updates during the semester if you can avoid it


#### Create object in R code chunk

```{r}
require(reticulate)
dat <- c(180, 215, 210, 210, 188, 176, 209, 200)
```


#### Manipulate `r.dat` in Python code chunk

```{python}
# Import the numpy package
import numpy

# Create a Numpy array from data: np_data
np_data = numpy.array(r.dat)

# Print out mean of np_data
avg = numpy.mean(np_data)

print(avg)
```

#### Access `py$avg` object in R code chunk

```{r}
py$avg
```







